/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.values;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.schemas.Factory;

import com.bff.gaia.unified.sdk.schemas.FieldValueGetter;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.Schema.FieldType;

import com.bff.gaia.unified.sdk.schemas.Schema.LogicalType;

import com.bff.gaia.unified.sdk.schemas.Schema.TypeName;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Maps;

import org.joda.time.DateTime;

import org.joda.time.Instant;

import org.joda.time.ReadableDateTime;

import org.joda.time.ReadableInstant;

import org.joda.time.base.AbstractInstant;



import javax.annotation.Nullable;

import java.io.Serializable;

import java.math.BigDecimal;

import java.nio.ByteBuffer;

import java.util.*;

import java.util.Map.Entry;

import java.util.stream.Collector;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/**

 * {@link Row} is an immutable tuple-like schema to represent one element in a {@link PCollection}.

 * The fields are described with a {@link Schema}.

 *

 * <p>{@link Schema} contains the names for each field and the coder for the whole record,

 * {see @link Schema#getRowCoder()}.

 */

@Experimental

public abstract class Row implements Serializable {

  private final Schema schema;



  Row(Schema schema) {

    this.schema = schema;

  }



  // Abstract methods to be implemented by subclasses that handle object access.



  /** Get value by field index, {@link ClassCastException} is thrown if schema doesn't match. */

  @Nullable

  @SuppressWarnings("TypeParameterUnusedInFormals")

  public abstract <T> T getValue(int fieldIdx);



  /** Return the size of data fields. */

  public abstract int getFieldCount();

  /** Return the list of data values. */

  public abstract List<Object> getValues();



  /** Get value by field name, {@link ClassCastException} is thrown if type doesn't match. */

  @Nullable

  @SuppressWarnings("TypeParameterUnusedInFormals")

  public <T> T getValue(String fieldName) {

    return getValue(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#BYTE} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Byte getByte(String fieldName) {

    return getByte(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#BYTES} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public byte[] getBytes(String fieldName) {

    return getBytes(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#INT16} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Short getInt16(String fieldName) {

    return getInt16(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#INT32} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Integer getInt32(String fieldName) {

    return getInt32(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#INT64} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Long getInt64(String fieldName) {

    return getInt64(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#DECIMAL} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public BigDecimal getDecimal(String fieldName) {

    return getDecimal(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#FLOAT} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Float getFloat(String fieldName) {

    return getFloat(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#DOUBLE} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Double getDouble(String fieldName) {

    return getDouble(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#STRING} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public String getString(String fieldName) {

    return getString(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#DATETIME} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public ReadableDateTime getDateTime(String fieldName) {

    return getDateTime(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#BOOLEAN} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Boolean getBoolean(String fieldName) {

    return getBoolean(getSchema().indexOf(fieldName));

  }



  /**

   * Get an array value by field name, {@link IllegalStateException} is thrown if schema doesn't

   * match.

   */

  @Nullable

  public <T> List<T> getArray(String fieldName) {

    return getArray(getSchema().indexOf(fieldName));

  }



  /**

   * Get a MAP value by field name, {@link IllegalStateException} is thrown if schema doesn't match.

   */

  @Nullable

  public <T1, T2> Map<T1, T2> getMap(String fieldName) {

    return getMap(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#ROW} value by field name, {@link IllegalStateException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Row getRow(String fieldName) {

    return getRow(getSchema().indexOf(fieldName));

  }



  /**

   * Get a {@link TypeName#BYTE} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Byte getByte(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#BYTES} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public byte[] getBytes(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#INT16} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Short getInt16(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#INT32} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Integer getInt32(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#FLOAT} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Float getFloat(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#DOUBLE} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Double getDouble(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#INT64} value by field index, {@link ClassCastException} is thrown if

   * schema doesn't match.

   */

  @Nullable

  public Long getInt64(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link String} value by field index, {@link ClassCastException} is thrown if schema

   * doesn't match.

   */

  @Nullable

  public String getString(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link TypeName#DATETIME} value by field index, {@link IllegalStateException} is thrown

   * if schema doesn't match.

   */

  @Nullable

  public ReadableDateTime getDateTime(int idx) {

    ReadableInstant instant = getValue(idx);

    return instant == null ? null : new DateTime(instant).withZone(instant.getZone());

  }



  /**

   * Get a {@link BigDecimal} value by field index, {@link ClassCastException} is thrown if schema

   * doesn't match.

   */

  @Nullable

  public BigDecimal getDecimal(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link Boolean} value by field index, {@link ClassCastException} is thrown if schema

   * doesn't match.

   */

  @Nullable

  public Boolean getBoolean(int idx) {

    return getValue(idx);

  }



  /**

   * Get an array value by field index, {@link IllegalStateException} is thrown if schema doesn't

   * match.

   */

  @Nullable

  public <T> List<T> getArray(int idx) {

    return getValue(idx);

  }



  /**

   * Get a MAP value by field index, {@link IllegalStateException} is thrown if schema doesn't

   * match.

   */

  @Nullable

  public <T1, T2> Map<T1, T2> getMap(int idx) {

    return getValue(idx);

  }



  /**

   * Get a {@link Row} value by field index, {@link IllegalStateException} is thrown if schema

   * doesn't match.

   */

  @Nullable

  public Row getRow(int idx) {

    return getValue(idx);

  }



  /** Return {@link Schema} which describes the fields. */

  public Schema getSchema() {

    return schema;

  }



  @Override

  public boolean equals(Object o) {

    if (this == o) {

      return true;

    }

    if (!(o instanceof Row)) {

      return false;

    }

    Row other = (Row) o;



    if (!Objects.equals(getSchema(), other.getSchema())) {

      return false;

    }



    for (int i = 0; i < getFieldCount(); i++) {

      if (!Equals.deepEquals(getValue(i), other.getValue(i), getSchema().getField(i).getType())) {

        return false;

      }

    }



    return true;

  }



  @Override

  public int hashCode() {

    int h = 1;

    for (int i = 0; i < getFieldCount(); i++) {

      h = 31 * h + Equals.deepHashCode(getValue(i), getSchema().getField(i).getType());

    }



    return h;

  }



  static class Equals {

    static boolean deepEquals(Object a, Object b, Schema.FieldType fieldType) {

      if (a == null || b == null) {

        return a == b;

      } else if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE) {

        return deepEquals(a, b, fieldType.getLogicalType().getBaseType());

      } else if (fieldType.getTypeName() == Schema.TypeName.BYTES) {

        return Arrays.equals((byte[]) a, (byte[]) b);

      } else if (fieldType.getTypeName() == Schema.TypeName.ARRAY) {

        return deepEqualsForList(

            (List<Object>) a, (List<Object>) b, fieldType.getCollectionElementType());

      } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {

        return deepEqualsForMap(

            (Map<Object, Object>) a, (Map<Object, Object>) b, fieldType.getMapValueType());

      } else {

        return Objects.equals(a, b);

      }

    }



    static int deepHashCode(Object a, Schema.FieldType fieldType) {

      if (a == null) {

        return 0;

      } else if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE) {

        return deepHashCode(a, fieldType.getLogicalType().getBaseType());

      } else if (fieldType.getTypeName() == Schema.TypeName.BYTES) {

        return Arrays.hashCode((byte[]) a);

      } else if (fieldType.getTypeName() == Schema.TypeName.ARRAY) {

        return deepHashCodeForList((List<Object>) a, fieldType.getCollectionElementType());

      } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {

        return deepHashCodeForMap(

            (Map<Object, Object>) a, fieldType.getMapKeyType(), fieldType.getMapValueType());

      } else {

        return Objects.hashCode(a);

      }

    }



    static <K, V> boolean deepEqualsForMap(Map<K, V> a, Map<K, V> b, Schema.FieldType valueType) {

      if (a == b) {

        return true;

      }



      if (a.size() != b.size()) {

        return false;

      }



      for (Entry<K, V> e : a.entrySet()) {

        K key = e.getKey();

        V value = e.getValue();

        V otherValue = b.get(key);



        if (value == null) {

          if (otherValue != null || !b.containsKey(key)) {

            return false;

          }

        } else {

          if (!deepEquals(value, otherValue, valueType)) {

            return false;

          }

        }

      }



      return true;

    }



    static int deepHashCodeForMap(

        Map<Object, Object> a, Schema.FieldType keyType, Schema.FieldType valueType) {

      int h = 0;



      for (Entry<Object, Object> e : a.entrySet()) {

        Object key = e.getKey();

        Object value = e.getValue();



        h += deepHashCode(key, keyType) ^ deepHashCode(value, valueType);

      }



      return h;

    }



    static boolean deepEqualsForList(List<Object> a, List<Object> b, Schema.FieldType elementType) {

      if (a == b) {

        return true;

      }



      if (a.size() != b.size()) {

        return false;

      }



      for (int i = 0; i < a.size(); i++) {

        if (!deepEquals(a.get(i), b.get(i), elementType)) {

          return false;

        }

      }



      return true;

    }



    static int deepHashCodeForList(List<Object> a, Schema.FieldType elementType) {

      int h = 1;

      for (int i = 0; i < a.size(); i++) {

        h = 31 * h + deepHashCode(a.get(i), elementType);

      }



      return h;

    }

  }



  @Override

  public String toString() {

    return "Row:" + Arrays.deepToString(Iterables.toArray(getValues(), Object.class));

  }



  /**

   * Creates a record builder with specified {@link #getSchema()}. {@link Builder#build()} will

   * throw an {@link IllegalArgumentException} if number of fields in {@link #getSchema()} does not

   * match the number of fields specified.

   */

  public static Builder withSchema(Schema schema) {

    return new Builder(schema);

  }



  /** Builder for {@link Row}. */

  public static class Builder {

    private List<Object> values = Lists.newArrayList();

    private boolean attached = false;

    @Nullable

	private Factory<List<FieldValueGetter>> fieldValueGetterFactory;

    @Nullable

	private Object getterTarget;

    private Schema schema;



    Builder(Schema schema) {

      this.schema = schema;

    }



    public int nextFieldId() {

      if (fieldValueGetterFactory != null) {

        throw new RuntimeException("Not supported");

      }

      return values.size();

    }



    public Schema getSchema() {

      return schema;

    }



    public Builder addValue(@Nullable Object values) {

      this.values.add(values);

      return this;

    }



    public Builder addValues(List<Object> values) {

      this.values.addAll(values);

      return this;

    }



    public Builder addValues(Object... values) {

      return addValues(Arrays.asList(values));

    }



    public <T> Builder addArray(List<T> values) {

      this.values.add(values);

      return this;

    }



    public Builder addArray(Object... values) {

      addArray(Arrays.asList(values));

      return this;

    }



    // Values are attached. No verification is done, and no conversions are done. LogicalType

    // values must be specified as the base type.

    public Builder attachValues(List<Object> values) {

      this.attached = true;

      this.values = values;

      return this;

    }



    public Builder withFieldValueGetters(

		Factory<List<FieldValueGetter>> fieldValueGetterFactory, Object getterTarget) {

      this.fieldValueGetterFactory = fieldValueGetterFactory;

      this.getterTarget = getterTarget;

      return this;

    }



    private List<Object> verify(Schema schema, List<Object> values) {

      List<Object> verifiedValues = Lists.newArrayListWithCapacity(values.size());

      if (schema.getFieldCount() != values.size()) {

        throw new IllegalArgumentException(

            String.format(

                "Field count in Schema (%s) (%d) and values (%s) (%d)  must match",

                schema.getFieldNames(), schema.getFieldCount(), values, values.size()));

      }

      for (int i = 0; i < values.size(); ++i) {

        Object value = values.get(i);

        Schema.Field field = schema.getField(i);

        if (value == null) {

          if (!field.getType().getNullable()) {

            throw new IllegalArgumentException(

                String.format("Field %s is not nullable", field.getName()));

          }

          verifiedValues.add(null);

        } else {

          verifiedValues.add(verify(value, field.getType(), field.getName()));

        }

      }

      return verifiedValues;

    }



    private Object verify(Object value, FieldType type, String fieldName) {

      if (TypeName.ARRAY.equals(type.getTypeName())) {

        return verifyArray(value, type.getCollectionElementType(), fieldName);

      } else if (TypeName.MAP.equals(type.getTypeName())) {

        return verifyMap(value, type.getMapKeyType(), type.getMapValueType(), fieldName);

      } else if (TypeName.ROW.equals(type.getTypeName())) {

        return verifyRow(value, fieldName);

      } else if (TypeName.LOGICAL_TYPE.equals(type.getTypeName())) {

        return verifyLogicalType(value, type.getLogicalType(), fieldName);

      } else {

        return verifyPrimitiveType(value, type.getTypeName(), fieldName);

      }

    }



    private Object verifyLogicalType(Object value, LogicalType logicalType, String fieldName) {

      return verify(logicalType.toBaseType(value), logicalType.getBaseType(), fieldName);

    }



    private List<Object> verifyArray(

        Object value, FieldType collectionElementType, String fieldName) {

      boolean collectionElementTypeNullable = collectionElementType.getNullable();

      if (!(value instanceof List)) {

        throw new IllegalArgumentException(

            String.format(

                "For field name %s and array type expected List class. Instead "

                    + "class type was %s.",

                fieldName, value.getClass()));

      }

      List<Object> valueList = (List<Object>) value;

      List<Object> verifiedList = Lists.newArrayListWithCapacity(valueList.size());

      for (Object listValue : valueList) {

        if (listValue == null) {

          if (!collectionElementTypeNullable) {

            throw new IllegalArgumentException(

                String.format(

                    "%s is not nullable in Array field %s", collectionElementType, fieldName));

          }

          verifiedList.add(null);

        } else {

          verifiedList.add(verify(listValue, collectionElementType, fieldName));

        }

      }

      return verifiedList;

    }



    private Map<Object, Object> verifyMap(

        Object value, FieldType keyType, FieldType valueType, String fieldName) {

      boolean valueTypeNullable = valueType.getNullable();

      if (!(value instanceof Map)) {

        throw new IllegalArgumentException(

            String.format(

                "For field name %s and map type expected Map class. Instead "

                    + "class type was %s.",

                fieldName, value.getClass()));

      }

      Map<Object, Object> valueMap = (Map<Object, Object>) value;

      Map<Object, Object> verifiedMap = Maps.newHashMapWithExpectedSize(valueMap.size());

      for (Entry<Object, Object> kv : valueMap.entrySet()) {

        if (kv.getValue() == null) {

          if (!valueTypeNullable) {

            throw new IllegalArgumentException(

                String.format("%s is not nullable in Map field %s", valueType, fieldName));

          }

          verifiedMap.put(verify(kv.getKey(), keyType, fieldName), null);

        } else {

          verifiedMap.put(

              verify(kv.getKey(), keyType, fieldName), verify(kv.getValue(), valueType, fieldName));

        }

      }

      return verifiedMap;

    }



    private Row verifyRow(Object value, String fieldName) {

      if (!(value instanceof Row)) {

        throw new IllegalArgumentException(

            String.format(

                "For field name %s expected Row type. " + "Instead class type was %s.",

                fieldName, value.getClass()));

      }

      // No need to recursively validate the nested Row, since there's no way to build the

      // Row object without it validating.

      return (Row) value;

    }



    private Object verifyPrimitiveType(Object value, TypeName type, String fieldName) {

      if (type.isDateType()) {

        return verifyDateTime(value, fieldName);

      } else {

        switch (type) {

          case BYTE:

            if (value instanceof Byte) {

              return value;

            }

            break;

          case BYTES:

            if (value instanceof ByteBuffer) {

              return ((ByteBuffer) value).array();

            } else if (value instanceof byte[]) {

              return (byte[]) value;

            }

            break;

          case INT16:

            if (value instanceof Short) {

              return value;

            }

            break;

          case INT32:

            if (value instanceof Integer) {

              return value;

            }

            break;

          case INT64:

            if (value instanceof Long) {

              return value;

            }

            break;

          case DECIMAL:

            if (value instanceof BigDecimal) {

              return value;

            }

            break;

          case FLOAT:

            if (value instanceof Float) {

              return value;

            }

            break;

          case DOUBLE:

            if (value instanceof Double) {

              return value;

            }

            break;

          case STRING:

            if (value instanceof String) {

              return value;

            }

            break;

          case BOOLEAN:

            if (value instanceof Boolean) {

              return value;

            }

            break;

          default:

            // Shouldn't actually get here, but we need this case to satisfy linters.

            throw new IllegalArgumentException(

                String.format("Not a primitive type for field name %s: %s", fieldName, type));

        }

        throw new IllegalArgumentException(

            String.format(

                "For field name %s and type %s found incorrect class type %s",

                fieldName, type, value.getClass()));

      }

    }



    private Instant verifyDateTime(Object value, String fieldName) {

      // We support the following classes for datetimes.

      if (value instanceof AbstractInstant) {

        return ((AbstractInstant) value).toInstant();

      } else {

        throw new IllegalArgumentException(

            String.format(

                "For field name %s and DATETIME type got unexpected class %s ",

                fieldName, value.getClass()));

      }

    }



    public Row build() {

      checkNotNull(schema);

      if (!this.values.isEmpty() && fieldValueGetterFactory != null) {

        throw new IllegalArgumentException("Cannot specify both values and getters.");

      }

      if (!this.values.isEmpty()) {

        List<Object> storageValues = attached ? this.values : verify(schema, this.values);

        checkState(getterTarget == null, "withGetterTarget requires getters.");

        return new RowWithStorage(schema, storageValues);

      } else if (fieldValueGetterFactory != null) {

        checkState(getterTarget != null, "getters require withGetterTarget.");

        return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget);

      } else {

        return new RowWithStorage(schema, Collections.emptyList());

      }

    }

  }



  /** Creates a {@link Row} from the list of values and {@link #getSchema()}. */

  public static <T> Collector<T, List<Object>, Row> toRow(Schema schema) {

    return Collector.of(

        () -> new ArrayList<>(schema.getFieldCount()),

        List::add,

        (left, right) -> {

          left.addAll(right);

          return left;

        },

        values -> Row.withSchema(schema).addValues(values).build());

  }



  /** Creates a new record filled with nulls. */

  public static Row nullRow(Schema schema) {

    return Row.withSchema(schema)

        .addValues(Collections.nCopies(schema.getFieldCount(), null))

        .build();

  }

}